#include "net/channel_core.h"
#include "packet/packet.h"
#include "packet/packet_factory.h"
#include "controller/controller.h"
#include "callback/callback.h"
#include "log/log.h"
#include "thread/thread_manage.h"
#include "net/flow_status.h"
#include "net/channel_post.h"

static uint32_t static_onece_recvbuffer_size = 2 * 1024;
static uint32_t static_client_channel_ms_timeout = 4000;

namespace weilin
{
	RpcChannelCore::RpcChannelCore(const ChannelType& channel_type, const uint32_t& channel_seq, const socket_shared_ptr& socket_sp, const bool& is_keep_alive, const uint32_t& ms_timeout, const call_back_shared_ptr& call_back_sp) :
		m_channel_type(channel_type),
		m_channel_seq(channel_seq),
		m_io_service(socket_sp->get_io_service()),
		m_socket_sp(socket_sp),
		m_timer_sp(new (std::nothrow)boost::asio::deadline_timer(m_io_service)),
		m_channel_post_sp(boost::make_shared<ChannelPost>(call_back_sp)),
		m_is_keep_alive(is_keep_alive),
		m_ms_timeout(ms_timeout),
		m_endpoint(m_socket_sp->remote_endpoint()),
		m_recv_packet_sp(PacketFactory::Instance()->GetPackect(channel_type, PACKET_REQUEST)),
		m_is_client(false),
		m_socket_status(SOCKET_IDLE),
		m_is_sending(false),
		m_connect_seq(0)
	{
		m_socket_status = SOCKET_CONNECTED;
		m_once_recv_str.resize(static_onece_recvbuffer_size, '\0');
		m_remote_ip_port = m_endpoint.address().to_string() + ":" + boost::lexical_cast<string>(m_endpoint.port());

		LOG(INFO) << "RpcChannelCore gouzao";
	}

	RpcChannelCore::RpcChannelCore(const ChannelType& channel_type, const string& remote_ip, const uint16_t& port, const uint32_t& ms_timeout, const call_back_shared_ptr& call_back_sp) :
		m_channel_type(channel_type),
		m_channel_seq(0),
		m_io_service(ThreadManage::Instance()->GetSocketIoService()),
		m_socket_sp(new (std::nothrow) boost::asio::ip::tcp::socket(m_io_service)),
		m_timer_sp(new (std::nothrow) boost::asio::deadline_timer(m_io_service)),
		m_channel_post_sp(boost::make_shared<ChannelPost>(call_back_sp)),
		m_is_keep_alive(true),
		m_ms_timeout(ms_timeout),
		m_endpoint(boost::asio::ip::address_v4::from_string(remote_ip), port),
		m_recv_packet_sp(PacketFactory::Instance()->GetPackect(channel_type, PACKET_REQUEST)),
		m_is_client(true),
		m_socket_status(SOCKET_IDLE),
		m_is_sending(false),
		m_connect_seq(0),
		m_flow_status_sp(boost::make_shared<FlowStatus>())
	{
		m_once_recv_str.resize(static_onece_recvbuffer_size, '\0');
		m_remote_ip_port = m_endpoint.address().to_string() + ":" + boost::lexical_cast<string>(m_endpoint.port());
		LOG(INFO) << "RpcChannelCore gouzao";
	}

	RpcChannelCore::~RpcChannelCore()
	{
		LOG(INFO) << "RpcChannelCore xigou";
	}

	void RpcChannelCore::AsyncConnect()
	{
		m_io_service.post(boost::bind(&RpcChannelCore::statrtAsyncConnect, shared_from_this()));
	}

	void RpcChannelCore::statrtAsyncConnect()
	{
		if (m_is_client == false)
		{
			return;
		}

		if (m_socket_status != SOCKET_IDLE && m_socket_status != SOCKET_COLOSED)
		{
			LOG(INFO) << "m_socket_status is error: " << m_socket_status;
			return;
		}
		m_connect_seq++;
		m_socket_status = SOCKET_CONNECTING;
		if (m_socket_sp->is_open() == false)
		{
			m_socket_sp->open(boost::asio::ip::tcp::v4());
		}
		m_timer_sp->cancel();
		m_timer_sp->expires_from_now(boost::posix_time::millisec(m_ms_timeout));
		m_timer_sp->async_wait(boost::bind(&RpcChannelCore::handleAsyncConnectTimerCheck, shared_from_this(), _1));
		m_socket_sp->async_connect(m_endpoint, boost::bind(&RpcChannelCore::handleAsyncConnectCb, shared_from_this(), m_connect_seq, _1));

		//callback
		m_channel_post_sp->PostSocketStatus(shared_from_this(), SOCKET_CONNECTING);
	}

	void RpcChannelCore::AsyncReConnect()
	{
		m_io_service.post(boost::bind(&RpcChannelCore::startAsyncReConnect, shared_from_this()));
	}
	void RpcChannelCore::startAsyncReConnect()
	{
		if (m_is_client == false)
		{
			return;
		}

		if (m_socket_status != SOCKET_COLOSED)
		{
			return;
		}
		m_timer_sp->cancel();
		m_timer_sp->expires_from_now(boost::posix_time::millisec(m_ms_timeout));
		m_timer_sp->async_wait(boost::bind(&RpcChannelCore::statrtAsyncConnect, shared_from_this()));
	}

	void RpcChannelCore::handleAsyncConnectCb(uint32_t connect_seq, const boost::system::error_code& err)
	{
		if (m_socket_status != SOCKET_CONNECTING || m_connect_seq != connect_seq)
		{
			return;
		}
		if (err)
		{
			LOG(INFO) << "connect error: " << err.value() << ", message: " << err.message() << std::endl;
			return;
		}
		m_socket_status = SOCKET_CONNECTED;
		
		boost::asio::ip::tcp::no_delay no_delay_option(true);
		m_socket_sp->set_option(no_delay_option);
		uint32_t buff_size = 1024 * 1024 * 2;
		boost::asio::socket_base::send_buffer_size send_option(buff_size);
		m_socket_sp->set_option(send_option);
		boost::asio::socket_base::receive_buffer_size recv_option(buff_size);
		m_socket_sp->set_option(recv_option);
		boost::asio::socket_base::keep_alive alive_option(true);
		m_socket_sp->set_option(alive_option);

		startSendAsync();
		startRecvAsync();
		//call back
		m_channel_post_sp->PostSocketStatus(shared_from_this(), SOCKET_CONNECTED);
	}

	void RpcChannelCore::handleAsyncConnectTimerCheck(const boost::system::error_code& err)
	{
		if (m_socket_status == SOCKET_CONNECTED)
		{
			//m_timer_sp->expires_at(boost::posix_time::pos_infin);
			return;
		}
		//asyncReConnect Operation canceled
		if (err.value() == 125)
		{
			return;
		}
		else if (err)
		{
			LOG(INFO) << "connect error: " << err.value() << ", message: " << err.message() << std::endl;
		}

		close(false);

		//call back
		m_channel_post_sp->PostSocketStatus(shared_from_this(), SOCKET_CONNECTING_TIMEOUT);
	}

	void RpcChannelCore::Stop()
	{
		wait_event_shared_ptr event_sp = boost::make_shared<WaitEvent>();
		m_io_service.post(boost::bind(&RpcChannelCore::stop, shared_from_this(), event_sp));
		event_sp->Wait();
	}

	const uint32_t& RpcChannelCore::GetSeq()
	{
		return m_channel_seq;
	}

	const string& RpcChannelCore::GetRemoteIpPort()
	{
		return m_remote_ip_port;
	}

	bool RpcChannelCore::IsBusy()
	{
		if (m_is_client == false)
		{
			return false;
		}
		bool ret = m_flow_status_sp->IsBusy();
		return ret;
	}

	void RpcChannelCore::stop(const wait_event_shared_ptr& event_sp)
	{
		close(false);
		event_sp->Signal();
	}
	void RpcChannelCore::close(const bool& is_need_callback)
	{
		if (m_socket_status == SOCKET_COLOSED)
		{
			return;
		}

		if (m_socket_sp->is_open())
		{
			m_socket_sp->close();
		}
		if (m_is_client == true)
		{
			m_channel_post_sp->ClearAllController();
			if (m_is_keep_alive == true);
			{
				LOG(INFO) << "connect to " << m_endpoint.address().to_string() << " : " << m_endpoint.port() << " fail " << std::endl;
			}
		}
		else
		{
			m_channel_post_sp->ClearAllPacket();
			if (m_is_keep_alive == true);
			{
				LOG(INFO) << "client " << m_endpoint.address().to_string() << " : " << m_endpoint.port() << " is disconnect" << std::endl;
			}
		}

		m_socket_status = SOCKET_COLOSED;
		if (is_need_callback == true)
		{
			//call back
			m_channel_post_sp->PostSocketStatus(shared_from_this(), SOCKET_COLOSED);
		}
	}

	void RpcChannelCore::StartAsyncRecv()
	{
		m_io_service.post(boost::bind(&RpcChannelCore::startRecvAsync, shared_from_this()));
		if (m_is_keep_alive == false)
		{
			m_io_service.post(boost::bind(&RpcChannelCore::noKeepaliveTimer, shared_from_this()));
		}
	}

	void RpcChannelCore::AsyncSendController(const controller_shared_ptr& controller_sp)
	{
		m_io_service.post(boost::bind(&RpcChannelCore::asyncSendController, shared_from_this(), controller_sp));
	}

	void RpcChannelCore::AsyncSendPacket(const packet_shared_ptr& packet_sp)
	{
		m_io_service.post(boost::bind(&RpcChannelCore::asyncSendPacket, shared_from_this(), packet_sp));
	}

	void RpcChannelCore::asyncSendController(const controller_shared_ptr& controller_sp)
	{
		if (m_socket_status != SOCKET_CONNECTED)
		{
			m_channel_post_sp->PostControllerSendRecvStatus(controller_sp, SEND_FAIL_FOR_CLOSE);
			return;
		}

		if (m_channel_post_sp->InsertController(controller_sp) == false ||  m_channel_post_sp->InsertPacket(controller_sp->GetSendPacket()) == false)
		{
			m_channel_post_sp->PostControllerSendRecvStatus(controller_sp, SEND_FAIL_FOR_SEQ);
			return;
		}

		//timer
		controller_sp->m_controller_timer_sp.reset(new (std::nothrow) boost::asio::deadline_timer(m_io_service));
		controller_sp->m_controller_timer_sp->cancel();
		controller_sp->m_controller_timer_sp->expires_from_now(boost::posix_time::millisec(m_ms_timeout));
		controller_sp->m_controller_timer_sp->async_wait(boost::bind(&RpcChannelCore::asyncSendControllerTimeout, shared_from_this(), controller_weak_ptr(controller_sp), _1));
		startSendAsync();
	}

	void RpcChannelCore::asyncSendPacket(const packet_shared_ptr& packet_sp)
	{

		if (m_socket_status != SOCKET_CONNECTED)
		{
			m_channel_post_sp->PostPacketSendStatus(packet_sp, SEND_ERROR_FOR_CLOSE);
			return;
		}

		if (m_channel_post_sp->InsertPacket(packet_sp) == false)
		{
			m_channel_post_sp->PostPacketSendStatus(packet_sp, SEND_ERROR_FOR_SEQ);
			controller_shared_ptr controller_sp = boost::make_shared<Controller>(packet_sp->GetSeq(), packet_sp);
			return;
		}
		startSendAsync();
	}

	void RpcChannelCore::startSendAsync()
	{
		//m_now_time = boost::posix_time::microsec_clock::local_time();
		if (m_is_sending == true)
		{
			return;
		}
		packet_shared_ptr send_packet_sp;
		if (m_channel_post_sp->GetFirstSendPacket(send_packet_sp) == false)
		{
			return;
		}
		m_is_sending = true;
		string sendDataStr;
		send_packet_sp->Tobuffer(sendDataStr);
		LOG(INFO) << FILE_LINE_FUNCTION << "weilin " << sendDataStr << "SEND_RET";

		boost::asio::async_write(*m_socket_sp, boost::asio::buffer(sendDataStr.data(), sendDataStr.length()), boost::bind(&RpcChannelCore::handleSendAsyncCb, shared_from_this(), send_packet_sp, _1, _2));
	}

	void RpcChannelCore::handleSendAsyncCb(const packet_shared_ptr& packet_sp, const boost::system::error_code &err, std::size_t bytes_transferred)
	{
		m_is_sending = false;
		if (err)
		{
			LOG(INFO) << "weilin " << FILE_LINE_FUNCTION << err.value() << ", message: " << err.message();
			close(true);
			return;
		}
		startSendAsync();
	}

	void RpcChannelCore::asyncSendControllerTimeout(const controller_weak_ptr& controller_wp, const boost::system::error_code& ec)
	{
		if (ec)
		{
			LOG(INFO) << "connect error: " << ec.value() << ", message: " << ec.message();
		}
		
		controller_shared_ptr controller_sp = controller_wp.lock();
		if (controller_sp == NULL)
		{
			return;
		}

		uint32_t seq = controller_sp->GetSendPacket()->GetSeq();
		if (m_channel_post_sp->FindRemoveControllerBySeq(seq, controller_sp) == false)
		{
			return;
		}
		packet_shared_ptr packet_sp;
		if (m_channel_post_sp->FindRemovePacketBySeq(seq, packet_sp) == true)
		{
			m_channel_post_sp->PostControllerSendRecvStatus(controller_sp, SEND_FAIL_FOR_TIMEOUT);
			ThreadManage::Instance()->TreadPoolPost(boost::bind(&FlowStatus::RecordSentTimeout, m_flow_status_sp));
		}
		else
		{
			m_channel_post_sp->PostControllerSendRecvStatus(controller_sp, RECV_FAIL_FOR_TIMEOUT);
			ThreadManage::Instance()->TreadPoolPost(boost::bind(&FlowStatus::RecordRecvTimeout, m_flow_status_sp));
		}
	}

	void RpcChannelCore::startRecvAsync()
	{
		if (m_socket_status != SOCKET_CONNECTED)
		{
			return;
		}
		m_socket_sp->async_read_some(boost::asio::buffer(&m_once_recv_str[0], m_once_recv_str.length()), boost::bind(&RpcChannelCore::handleRecvAsyncCb, shared_from_this(), _1, _2));
	}

	void RpcChannelCore::handleRecvAsyncCb(const boost::system::error_code &err, std::size_t bytes_transferred)
	{
		if (err || bytes_transferred == 0)
		{
			LOG(INFO) << FILE_LINE_FUNCTION << "weilin " << err.value() << ", message: " << err.message();
			close(true);
			return;
		}
		size_t buffer_index = 0;
		ConsumerRetStatus ret = CONSUM_RET_FAIL;
		while (buffer_index < bytes_transferred)
		{
			ret = m_recv_packet_sp->Consume(m_once_recv_str[buffer_index]);
			buffer_index++;
			if (ret == CONSUM_RET_FAIL)
			{
				LOG(INFO) << FILE_LINE_FUNCTION << "weilin " << "CONSUM_RET_FAIL";
				m_recv_packet_sp->PrintPacket();
				close(true);
				return;
			}
			else if (ret == CONSUM_RET_SUCCESS)
			{
				LOG(INFO) << FILE_LINE_FUNCTION << "weilin " << "CONSUM_RET_SUCCESS";
				m_recv_packet_sp->PrintPacket();
				//thread pool
				m_channel_post_sp->PostRecvPacket(channel_core_weak_ptr(shared_from_this()), m_recv_packet_sp);
				m_recv_packet_sp = PacketFactory::Instance()->GetPackect(m_channel_type, PACKET_REQUEST);
			}
		}
		startRecvAsync();
	}

	void RpcChannelCore::noKeepaliveTimer()
	{
		std::cout << FILE_LINE_FUNCTION << std::endl;
		m_timer_sp->cancel();
		m_timer_sp->expires_from_now(boost::posix_time::millisec(static_client_channel_ms_timeout));
		m_timer_sp->async_wait(boost::bind(&RpcChannelCore::noKeepaliveTimerCb, shared_from_this(), _1));
	}

	void RpcChannelCore::noKeepaliveTimerCb(const boost::system::error_code& err)
	{
		if (err)
		{
			LOG(INFO) << "disconnectTimer error: " << err.value() << ", message: " << err.message();
		}
		close(true);

//		if (m_channel_type == CHANNEL_CMD)
//		{
//			close(true);
//			return;
//		}

//		for long connection
//		boost::posix_time::ptime now_time = boost::posix_time::microsec_clock::local_time();
//		std::cout << FILE_LINE_FUNCTION << (now_time - m_now_time).total_milliseconds() << std::endl;
//		if ((now_time - m_now_time).total_milliseconds() >= m_ms_timeout)
//		{
//			close(true);
//		}
//		else
//		{
//			noKeepaliveTimer();
//		}
	}
}
